package com.hy.kafka.consumer;

import com.alibaba.fastjson.JSONObject;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.KafkaListener;

@Configuration
public class KafkaConsumer {

    protected static Logger logger = LoggerFactory.getLogger(KafkaConsumer.class);
    /**
     * Topic监听
     */
    @KafkaListener(topics = "TOPICNAME",groupId = "hy")
    public void qzkReceive(ConsumerRecord<?, ?> consumer) {
        Object o=consumer.value();
        try {
            JSONObject jsonObject=JSONObject.parseObject(o.toString());
            logger.info("消费到了数据===："+jsonObject.toJSONString());
        }catch (Exception e){
            //测试使用为了防止测试的时候输入的不是json格式
            logger.info("消费到了数据===："+o);
        }

    }
}
